package org.dragonnova.business.web;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

import org.dragonnova.business.common.Constants;
import org.dragonnova.business.message.MessageFactory;
import org.dragonnova.business.message.mq.kafka.KafkaMessageListener;
import org.dragonnova.business.message.mq.kafka.KafkaMessageListener.SimpleListener;
import org.dragonnova.business.message.mq.kafka.Listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

import com.base.pub.websocket.SimpleWebSocketHandler;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

public class TagGlobalWebSocketHandler extends SimpleWebSocketHandler {

	private final static Logger LOGGER = LoggerFactory
			.getLogger(TagGlobalWebSocketHandler.class);

	private static final String ALARM_URL_MAPPING = "webSocket/global.ws";

	@Autowired
	private KafkaMessageListener messageListener;

	@Autowired
	private MessageFactory messageFactory;

	private Listener alarmTagListener;

	@Override
	public void afterConnectionEstablished(WebSocketSession session)
			throws Exception {
		super.initSession(session);

		if (alarmTagListener == null) {
			alarmTagListener = new AlarmTagListener();
			messageListener.addListener(alarmTagListener);
		}
	}

	@Override
	protected void handleTextMessage(WebSocketSession session,
			TextMessage message) throws Exception {

	}

	@Override
	public void afterConnectionClosed(WebSocketSession wss, CloseStatus cs)
			throws Exception {
		LOGGER.warn("websocket connection closed...... " + cs.getCode());
		super.closeSession(wss);
	}

	@SuppressWarnings("static-access")
	private void sendStatMessage(String topic, String textJson, long timestamp) {
		try {
			sendMessage(getWebSocketSessionManager().packData(
					Constants.RESULT_WARN_INFO,
					"统计",
					messageFactory.transfer(messageFactory, topic, textJson,
							timestamp)));
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private void sendCountMessage(String topic, long count) {
		Map<String, Object> data = Maps.newHashMap();
		data.put(topic, count);
		try {
			sendMessage(getWebSocketSessionManager().packData(
					Constants.RESULT_WARN_INFO, Constants.TOPIC_COUNT, data));
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private class AlarmTagListener implements SimpleListener<Object, Long> {

		private Map<String, Long> topicTimestamp = Maps.newHashMap();

		@Override
		public Set<String> getTopics() {
			return Sets.newHashSet(Constants.TOPIC_NETFLOW,
					Constants.TOPIC_ALARM, Constants.TOPIC_PAY);
		}

		@Override
		public boolean onMessage(String topic, Object data, Long timestamp) {
			if (data instanceof String) {
				if (topic.equals(Constants.TOPIC_NETFLOW)) {
					sendStatMessage(topic, (String) data, timestamp);
					return true;
				}
			}
			return false;
		}

		@Override
		public void onMessageCount(String topic, long count, long timestamp) {
			sendCountMessage(topic, count);

			long oldTs = !topicTimestamp.containsKey(topic) ? 0
					: topicTimestamp.get(topic);
			if (Long.compare(timestamp, oldTs) >= 0) {
				topicTimestamp.put(topic, timestamp);
			}
		}

		@Override
		public long getLastConsumerTimestamp(String topic) {
			return !topicTimestamp.containsKey(topic) ? 0 : topicTimestamp
					.get(topic);
		}
	}

}
